<!DOCTYPE html>
<html lang="zh-cn">
<head>
   
    <link type="text/css" rel="stylesheet" href="/bundles/blog-common.css?v=KOZafwuaDasEedEenI5aTy8aXH0epbm6VUJ0v3vsT_Q1"/>
<link id="MainCss" type="text/css" rel="stylesheet" href="/skins/ThinkInside/bundle-ThinkInside.css?v=RRjf6pEarGnbXZ86qxNycPfQivwSKWRa4heYLB15rVE1"/>
<link type="text/css" rel="stylesheet" href="/blog/customcss/428549.css?v=%2fam3bBTkW5NBWhBE%2fD0lcyJv5UM%3d"/>

</head>
<body>
<a name="top"></a>

<div id="page_begin_html"></div><script>load_page_begin_html();</script>

<div id="topics">
	<div class = "post">
		<h1 class = "postTitle">
			<a id="cb_post_title_url" class="postTitle2" href="https://www.cnblogs.com/frankdeng/p/9572017.html">Storm（三）Storm的原理机制</a>
		</h1>
		<div class="clear"></div>
		<div class="postBody">
			<div id="cnblogs_post_body" class="blogpost-body"><h2>一.Storm的数据分发策略</h2>
<h3>1. Shuffle Grouping&nbsp;</h3>
<p>随机分组，随机派发stream里面的tuple，保证每个bolt task接收到的tuple数目大致相同。 轮询，平均分配&nbsp;</p>
<h3>2. Fields Grouping</h3>
<p>按字段分组，比如，按"user-id"这个字段来分组，那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task， 而不同的"user-id"则可能会被分配到不同的task。&nbsp;</p>
<h3>3. All Grouping</h3>
<p>广播发送，对于每一个tuple，所有的bolts都会收到&nbsp;</p>
<h3>4. Global Grouping</h3>
<p>全局分组，把tuple分配给task id最低的task 。</p>
<h3>5. None Grouping</h3>
<p>不分组，这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行（未来Storm如果可能的话会这样设计）。&nbsp;</p>
<h3>6. Direct Grouping</h3>
<p>指向型分组， 这是一种比较特别的分组方法，用这种分组意味着消息（tuple）的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)&nbsp;&nbsp;</p>
<h3>7. Local or shuffle grouping</h3>
<p>本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中，tuple将会被随机发送给这些同进程中的tasks。否则，和普通的Shuffle Grouping行为一致</p>
<h3>8.customGrouping</h3>
<p>自定义，相当于mapreduce那里自己去实现一个partition一样。</p>
<h2>二.Storm的并发机制</h2>
<p>Worker &ndash; 进程</p>
<p>一个Topology拓扑会包含一个或多个Worker（每个Worker进程只能从属于一个特定的Topology） 这些Worker进程会并行跑在集群中不同的服务器上，即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成</p>
<p>Executor &ndash; 线程</p>
<p>Executor是由Worker进程中生成的一个线程 每个Worker进程中会运行拓扑当中的一个或多个Executor线程 一个Executor线程中可以执行一个或多个Task任务（默认每个Executor只执行一个Task任务），但是这些Task任务都是对应着同一个组件（Spout、Bolt）。</p>
<p>Task</p>
<p>实际执行数据处理的最小单元 每个task即为一个Spout或者一个Bolt Task数量在整个Topology生命周期中保持不变，Executor数量可以变化或手动调整 （默认情况下，Task数量和Executor是相同的，即每个Executor线程中默认运行一个Task任务）</p>
<p>设置Worker进程数</p>
<p>Config.setNumWorkers(int workers)</p>
<p>设置Executor线程数</p>
<p>TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) ，TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) ：其中， parallelism_hint即为executor线程数</p>
<p>设置Task数量</p>
<p>ComponentConfigurationDeclarer.setNumTasks(Number val)</p>
<p>例：</p>
<p>Rebalance &ndash; 再平衡</p>
<p>即，动态调整Topology拓扑的Worker进程数量、以及Executor线程数量</p>
<p>支持两种调整方式： 1、通过Storm UI 2、通过Storm CLI</p>
<p>通过Storm CLI动态调整： storm help rebalance</p>
<p>例：storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 将mytopology拓扑worker进程数量调整为5个， &ldquo; blue-spout &rdquo; 所使用的线程数量调整为3个 ，&ldquo; yellow-bolt &rdquo;所使用的线程数量调整为10个。</p>
<h2>三.Storm的通信机制</h2>
<p>Worker进程间的数据通信</p>
<p>ZMQ ZeroMQ 开源的消息传递框架，并不是一个MessageQueue Netty Netty是基于NIO的网络框架，更加高效。（之所以Storm 0.9版本之后使用Netty，是因为ZMQ的license和Storm的license不兼容。）</p>
<p>Worker内部的数据通信</p>
<p>Disruptor 实现了&ldquo;队列&rdquo;的功能。 可以理解为一种事件监听或者消息处理机制，即在队列当中一边由生产者放入消息数据，另一边消费者并行取出消息数据处理。</p>
<p>Worker内部的消息传递机制</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902222936726-1251327614.png" alt="" /></p>
<h2>四.Storm的容错机制</h2>
<p>1、集群节点宕机</p>
<p>Nimbus服务器 单点故障？ 非Nimbus服务器 故障时，该节点上所有Task任务都会超时，Nimbus会将这些Task任务重新分配到其他服务器上运行</p>
<p>2、进程挂掉</p>
<p>Worker 挂掉时，Supervisor会重新启动这个进程。如果启动过程中仍然一直失败，并且无法向Nimbus发送心跳，Nimbus会将该Worker重新分配到其他服务器上 Supervisor 无状态（所有的状态信息都存放在Zookeeper中来管理） 快速失败（每当遇到任何异常情况，都会自动毁灭） Nimbus 无状态（所有的状态信息都存放在Zookeeper中来管理） 快速失败（每当遇到任何异常情况，都会自动毁灭）</p>
<p>3、消息的完整性</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223144868-436190276.png" alt="" /></p>
<p>从Spout中发出的Tuple，以及基于他所产生Tuple（例如上个例子当中Spout发出的句子，以及句子当中单词的tuple等） 由这些消息就构成了一棵tuple树 当这棵tuple树发送完成，并且树当中每一条消息都被正确处理，就表明spout发送消息被&ldquo;完整处理&rdquo;，即消息的完整性</p>
<p>Acker -- 消息完整性的实现机制 Storm的拓扑当中特殊的一些任务 负责跟踪每个Spout发出的Tuple的DAG（有向无环图）</p>
<h2>五.Storm的DRPC</h2>
<p>DRPC (Distributed RPC) 分布式远程过程调用</p>
<p>DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。 DRPC Server 负责接收 RPC 请求，并将该请求发送到 Storm中运行的 Topology，等待接收 Topology 发送的处理结果，并将该结果返回给发送请求的客户端。 （其实，从客户端的角度来说，DPRC 与普通的 RPC 调用并没有什么区别。）</p>
<p>DRPC设计目的： 为了充分利用Storm的计算能力实现高密度的并行实时计算。 （Storm接收若干个数据流输入，数据在Topology当中运行完成，然后通过DRPC将结果进行输出。）</p>
<p>客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果，并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器，根据函数调用的 id 来将函数调用的结果返回。</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223541400-2107585017.png" alt="" /></p>
<p>定义DRPC拓扑：</p>
<p>方法1： 通过LinearDRPCTopologyBuilder （该方法也过期，不建议使用） 该方法会自动为我们设定Spout、将结果返回给DRPC Server等，我们只需要将Topology实现</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223652473-159248705.png" alt="" /></p>
<p>方法2： 直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑 需要手动设定好开始的DRPCSpout以及结束的ReturnResults</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223719054-1735851736.png" alt="" /></p>
<p>运行模式：</p>
<p>1、本地模式</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223754991-1945428128.png" alt="" /></p>
<p>2.远程模式（集群模式）</p>
<p>修改配置文件conf/storm.yaml drpc.servers: - "node21&ldquo; 启动DRPC Server bin/storm drpc &amp; 通过StormSubmitter.submitTopology提交拓扑</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902223909426-1309143765.png" alt="" /></p>
<h2>六.Storm的事务</h2>
<p>事务性拓扑（Transactional Topologies）</p>
<p>保证消息（tuple）被且仅被处理一次</p>
<p>Design 1</p>
<p>强顺序流（强有序） 引入事务（transaction）的概念，每个transaction（即每个tuple）关联一个transaction id。 Transaction id从1开始，每个tuple会按照顺序+1。 在处理tuple时，将处理成功的tuple结果以及transaction id同时写入数据库中进行存储。</p>
<p>两种情况：</p>
<p>1、当前transaction id与数据库中的transaction id不一致</p>
<p>2、两个transaction id相同</p>
<p>缺点： 一次只能处理一个tuple，无法实现分布式计算</p>
<p>Design 2</p>
<p>强顺序的Batch流</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902224226917-680574680.png" alt="" /></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>事务（transaction）以batch为单位，即把一批tuple称为一个batch，每次处理一个batch。 每个batch（一批tuple）关联一个transaction id ，每个batch内部可以并行计算</p>
<p>缺点</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201809/1385722-20180902224322820-571543915.png" alt="" /></p>
<p>Design 3</p>
<p>Storm's design</p>
<p>将Topology拆分为两个阶段：</p>
<p>1、Processing phase 允许并行处理多个batch</p>
<p>2、Commit phase 保证batch的强有序，一次只能处理一个batch</p>
<p>Design details</p>
<p>Manages state - 状态管理</p>
<p>Storm通过Zookeeper存储所有transaction相关信息（包含了：当前transaction id 以及batch的元数据信息）</p>
<p>Coordinates the transactions - 协调事务</p>
<p>Storm会管理决定transaction应该处理什么阶段（processing、committing）</p>
<p>Fault detection - 故障检测</p>
<p>Storm内部通过Acker机制保障消息被正常处理（用户不需要手动去维护）</p>
<p>First class batch processing API</p>
<p>Storm提供batch bolt接口</p>
<p>三种事务：</p>
<p>1、普通事务</p>
<p>2、Partitioned Transaction - 分区事务</p>
<p>3、Opaque Transaction - 不透明分区事务</p></div>

</body>
</html>
